package cn.icanci.loopstack.rec.engine.sdk.server;

import cn.icanci.loopstack.rec.engine.sdk.exception.ValidatorException;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author icanci
 * @since 1.0 Created in 2022/11/20 18:17
 */
public class RecNettyServer {

    private static final Logger             logger    = LoggerFactory.getLogger(RecNettyServer.class);

    private static final int                CORE_SIZE = Runtime.getRuntime().availableProcessors();

    private static final ThreadPoolExecutor POOL      = new ThreadPoolExecutor(CORE_SIZE,             //
        CORE_SIZE << 1,                                                                               //
        60L,                                                                                          //
        TimeUnit.SECONDS,                                                                             //
        new LinkedBlockingQueue<>(2000),                                                              //
        runnable -> new Thread(runnable, "RECServerThread Pool-" + runnable.hashCode()),              //
        (r, executor) -> {
            throw new RuntimeException("RECServerThread Pool is EXHAUSTED!");
        });

    private static RegisterServer           registerServer;

    public static void setRegisterService(RegisterServer registerServer) {
        RecNettyServer.registerServer = registerServer;
    }

    /**
     * 需要将本SDK所在的客户端注册到服务端
     *
     * @param serverAddress 服务端ip地址
     * @param serverPort 服务端端口
     * @param clientPort 客户端端口
     * @param appName 客户端服务名
     */
    public static void startClient(String serverAddress, int serverPort, int clientPort, String appName) {
        if (StringUtils.isBlank(appName)) {
            throw new ValidatorException("App Name cannot be empty");
        }
        // 启动时候注册
        startClient0(serverAddress, serverPort, clientPort, appName);
        // 自动进行注册
        // 项目启动了，但是没有配置域信息，此时注册失败的，如果不自动注册，则需要进行重启才能注册
        autoRegister(serverAddress, serverPort, clientPort, appName);
    }

    private static void startClient0(String serverAddress, int serverPort, int clientPort, String appName) {
        Thread recThread = new Thread(() -> {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();

            ServerBootstrap bootstrap = new ServerBootstrap();

            bootstrap.group(bossGroup, workerGroup) //
                .channel(NioServerSocketChannel.class) //
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel channel) throws Exception {
                        ChannelPipeline pipeline = channel.pipeline();
                        pipeline.addLast(new IdleStateHandler(0, 0, 30, TimeUnit.SECONDS));
                        pipeline.addLast(new HttpServerCodec());
                        pipeline.addLast(new HttpObjectAggregator(5 * 1024 * 1024));
                        pipeline.addLast(new RecNettyServerHandler(POOL));
                    }
                }).childOption(ChannelOption.SO_KEEPALIVE, true);

            try {
                ChannelFuture future = bootstrap.bind(clientPort).sync();

                doRegistry(serverAddress, serverPort, clientPort, appName);

                future.channel().closeFuture().sync();

            } catch (InterruptedException e) {
                logger.info("REC remoting server interruptedException", e);
            } catch (Exception e) {
                logger.info("REC remoting server error", e);
            } finally {
                workerGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
            }

        });
        recThread.setDaemon(true);
        recThread.start();
    }

    /**
     * 将SDK所在服务注册到注册中心
     *
     * @param serverAddress 服务端ip地址
     * @param serverPort 服务端端口
     * @param clientPort 客户端端口
     * @param appName 客户端服务名
     */
    private static void doRegistry(String serverAddress, int serverPort, int clientPort, String appName) {
        registerServer.register(serverAddress, serverPort, clientPort, appName);
    }

    /**
     * 自动注册
     *
     * @param serverAddress 服务端ip地址
     * @param serverPort 服务端端口
     * @param clientPort 客户端端口
     * @param appName 客户端服务名
     */
    private static void autoRegister(String serverAddress, int serverPort, int clientPort, String appName) {
        Thread autoRegisterThread = new Thread(() -> {
            // 每120秒刷新注册一次
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(120));
            doRegistry(serverAddress, serverPort, clientPort, appName);
        });
        autoRegisterThread.setDaemon(true);
        autoRegisterThread.start();
    }
}
